查看原文
其他

Filebeat自定义pipeline,完美处理自定义日志字段

运维研习社 运维研习社 2022-11-05

filebeat是本地文件日志数据采集器,通常用作ELK中的日志采集,将采集的日志数据传输到elasticsearch,当需要进行数据处理时,先传入logstash,经过logstash处理后再存入elasticsearch


当filebeat收集的日志量大,传输到elasticsearch来不及处理时,需要先传到kafka或redis队列,再存入elasticsearch


这是目前很多ELK的架构,但现在的filebeat,已经不在只是日志收集工具,它内置了很多模块,可以做到日志收集、解析整个过程,目前支持的日志模块已经覆盖很全了

基本流行的中间件、应用日志,都有覆盖到


所以对于基础日志,现在完全可以用filebeat进行数据解析,而不需要自己再去logstash写匹配


filebeat可以将数据进行预处理,然后输出到elasticsearch。filebeat对数据的解析,都是通过ingest节点进行预处理,filebeat中默认定义了ingest的pipeline

这些pipeline,都在filebeat安装后的/usr/share/filebeat/modules各个模块下进行定义


如果你的日志格式是默认的,没有进行自定义,那么filebeat自带的pipeline解析完全够用,并且处理的很好,比如nginx日志处理后的字段

截图中只是很少一部分,在filebeat的nginx模块中包括了http、host、agent、source等所有字段的解析,如果使用的是默认的日志格式,完全不需要你手动解析,可以通过filebeat的解析,组合出一套炫酷的Dashboard


但是,很多时候默认的模板无法满足我们的需求,我们需要添加一些字段,或者一些自定义的字段,这个时候,我们需要去手动修改pipeline,或者手动写一个pipeline,添加对应的procesors来处理对应的日志字段


举例来说,通常nginx前面我们会添加CDN,这样在nginx日志中,通过remote ip获取到的是CDN节点的IP,而不是真实客户端的IP,这个IP对于我们分析日志来说完全没有意义的,我们需要获取真实客户端IP,在nginx的日志格式中,通常通过http_x_forwarded_for来获取代理ip的列表,所以在pipeline中需要添加grok来进行匹配这个字段,获取真实客户端IP


这样,我通过grok处理message字段的时候,将message字段中的http_x_forwarded_for匹配为nginx.access.xff,这个自己定义,这个后面要用到,在kibana中map里面展示用户请求的map的时候,是通过geo.location来定位用户位置的,而这个geo.location就是通过ip匹配geoip数据库来获取坐标实现定位的


原先的geoip处理器是通过source.ip字段获取IP地址,在GeoLite2数据库中查询的,而source.ip是通过grok处理source.address得到的,source.address是匹配$remote_host得来的,所以这里获取到的,其实就是上一层代理的IP,对于我来说,就是CDN节点的IP地址,所以在map中展示的位置就那么几个CDN节点数据中心的位置,而且计算出的UV也是有问题的


所以这里需要修改geoip处理的field,改为使用刚才grok处理过的nginx.access.xff来作为用户真实ip地址进行解析,这样在kibana中添加map的时候,获取到的才是真实的用户地址


完成pipeline的修改,或者编写后,保存为json文件,需要将pipeline PUT到elasticsearch

从kibana上通过的dev tools查看该pipeline

接着修改manifest.yml,修改nginx模块调用的pipeline


修改完成后,重启filebeat,可以从filebeat日志中看到加载的pipeline已经更改了


接着可以查看elasticsearch中的数据了

可以看到nginx.access.xff对应的是http_x_forward-for对应的客户端真实IP,而geoip获取到的信息也是真实IP对应的信息,cdn的IP地址是深圳的,而客户真实IP是在广州的,也可以通过坐标反差可以确认


这样就通过修改filebeat的pipeline,新增或修改日志字段,这里顺便加了nginx的request_time和upstream_response_time,可以通过kibana的Timelion来创建nginx响应时间的实时统计图,以此来监测nginx性能


filebeat支持的pipeline的处理器很多,可以通过官网文档查看

https://www.elastic.co/guide/en/elasticsearch/reference/7.6/ingest-processors.html


不管是用logstash还是用filebeat,比较麻烦的地方是写grok,在kibana的dev tools中提供了grok debugger调试工具,方便调试,调试完成后,就可以自定义pipeline,随意处理日志


但是grok有性能问题,如果日志量大的话,不建议这么做,不过话说回来,日志量大的话,也就不会直接filebeat传es了


Linux下time命令进行性能分析 几种方法有效屏蔽国外恶意IP扫描 懒人玩docker必备 



更多精彩内容请扫描下方二维码关注公众号


扫描二维码关注我们吧



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存